/*
 * =====================================================================================
 *
 *       Filename:  main.h
 *
 *    Description:  vector index helper main file.
 *
 *        Version:  1.0
 *        Created:  10/25/2021
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  chenyujie28, chenyujie28@jd.com
 *        Company:  JD.com, Inc.
 *
 * =====================================================================================
 */
#ifndef VECTOR_INDEX_HELPER_MAIN_
#define VECTOR_INDEX_HELPER_MAIN_

#include <regex>

#include "log.h"
#include "config_manager.h"
#include "faiss_index_factory.h"
#include "rocksdb_basic_operator.h"
#include "vet_system_status.h"
#include "vet_cmd_handler.h"
#include "agent_process.h"
#include "agent_listen_pkg.h"

using namespace vectorindex;

bool ParseVetKey(std::string s_key ,
             UniqueIndexFlag& o_uniq_index,
             std::vector<float>& o_vector_data)
{
    std::size_t i_fir_pos = s_key.find_first_of("#");
    if (i_fir_pos != std::string::npos) {
         o_uniq_index.i_appid_ = atoi(s_key.substr(0,i_fir_pos).c_str());
    }
    
    std::size_t i_sec_pos = s_key.find_last_of("#");
    if (i_sec_pos != std::string::npos) {
         o_uniq_index.i_field_id_ = atoi(s_key.substr(i_fir_pos + 1 , i_sec_pos - i_fir_pos - 1).c_str());
    }

    if (-1 == o_uniq_index.i_appid_ || -1 == o_uniq_index.i_field_id_) {
        log_error("rocksdb has illegel key");
        return false;
    }
    
    s_key = s_key.substr(i_sec_pos + 1);
    const char* split_flag = "|";
    char* p = strtok((char*)s_key.c_str() , split_flag);
    while (p != NULL) {
        o_vector_data.emplace_back(atof(p));
        p = strtok(NULL , split_flag);
    }

    return true;
};

bool ReloadDbToFaissRam()
{
    IndexFactory* p_index_fac = IndexFactory::Instance();
    rocksdb::Iterator* iter = RocksDbOperator::Instance()->GetIterator();
    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
        UniqueIndexFlag o_uniq_idx_flag;
        std::vector<float> o_vet_array;
        bool b_ret = ParseVetKey(iter->key().ToString(), o_uniq_idx_flag, o_vet_array);
        if (false == b_ret) {
            return false;
        }

        std::vector<int64_t> o_ids;
        o_ids.emplace_back((int64_t)strtoll(iter->value().ToString().c_str(),NULL,10));
        
        int i_size = p_index_fac->GetIdxNum(o_uniq_idx_flag);
        for (int i = 0; i < i_size; i++) {
            o_uniq_idx_flag.i_index_typeid_ = i;
            if (!p_index_fac->CheckUniIdxIsValid(o_uniq_idx_flag)) {
                log_error("no index for appid:%d , fieldid:%d , index_type_id:%d" , 
                    o_uniq_idx_flag.i_appid_ , 
                    o_uniq_idx_flag.i_field_id_ ,
                    o_uniq_idx_flag.i_index_typeid_ );
                return false;
            }
            
            (*p_index_fac)[o_uniq_idx_flag]->InsertVet(
                        o_vet_array.data(),
                        o_ids.data());
        }
    }
    return true;
};

int main(int argc, char *argv[])
{
    stat_init_log_("vector_index","./config_error_log/");
    stat_set_log_level_(7);
    bool b_ret = ConfigManager::Instance()->InitConfig();
    if (!b_ret) { return -1; }
    
    const VectorIndexContext* p_vet_cnt = 
                ConfigManager::Instance()->GetVectorIndexTable();
    
    if (p_vet_cnt->b_daemon_) {
        daemon(1,0);
    }
    
    stat_init_log_(p_vet_cnt->s_service_name_.c_str(),
            p_vet_cnt->s_log_dir_.c_str());
    stat_set_log_level_(p_vet_cnt->ui_log_level_);
    log_debug("start service at [%s] [%s]", __DATE__, __TIME__);

    b_ret = IndexFactory::Instance()->InitIndex();
    if (!b_ret) { return -1; }

    b_ret = RocksDbOperator::Instance()->InitRocksDb(p_vet_cnt->s_rocksdb_dir_);
    if (!b_ret) { return -1; }

    std::string s_vet_id = "-1";
    int i_ret = RocksDbOperator::Instance()->GetEntry(
                        VET_CURRENT_NUM_KEY, 
                        s_vet_id , 
                        E_COL_FAM_META_DATA);
    if (0 == i_ret) { // rocksdb storage dir had existed , reload data into faiss RAM
        b_ret = ReloadDbToFaissRam();
        if (!b_ret) {
            log_error("reload db to faiss error.");
            return -1;
        }
    }

    SystemState::Instance()->SetVetId(s_vet_id);
    i_ret = SystemState::Instance()->AutoIncVetId();
    if (i_ret != 0) { 
        log_error("auto inc vet id error.");
        return -1;
    }
    
    CPollThread* worker_thread = new CPollThread("worker");
    if (worker_thread->InitializeThread() == -1) {
        log_error("init worker thread error.");
        return -1;
    }

    CmdHandler* p_vet_cmd_handler = new CmdHandler(worker_thread);
    CAgentProcess* p_agent_process = new CAgentProcess(worker_thread);
    p_agent_process->BindDispatcher(p_vet_cmd_handler);

    CAgentListenPkg* p_agent_listener = new CAgentListenPkg();
    int32_t listen_fd = 0;
    
    unlink(p_vet_cnt->s_local_socket_.c_str());
    if (p_agent_listener->Bind(p_vet_cnt->s_local_socket_.c_str(),
             p_agent_process, listen_fd) < 0) {
        log_error("bind agentListener error.");
        return -1;
    }
    worker_thread->RunningThread();
    p_agent_listener->Run();
    
    while (1) {
        sleep(20);
    }
    
    DELETE(p_agent_listener);

    return 0;
};

#endif